import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statistics import mean
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import plotly.io as pio
# Use pio to ensure plotly plots render in Quarto
pio.renderers.default = 'notebook_connected'Helper Functions
import plotly.graph_objects as go
def plot_3d_regression(model, model_name, data_type = 'linear'):
# Make Standard Data
X, y = make_regression(
n_samples=300, # Number of samples
n_features=2, # Number of features
noise=15, # Add noise to make it realistic
random_state=42 # Set seed for reproducibility
)
# Make standard non-linear data
if data_type.lower() == 'non-linear':
y = y + 500 * np.sin(X[:, 0]) + 250 * np.cos(X[:, 1])
# Make non-standard data
if data_type == 'ugly':
# Add non-linear transformations
y = y + 500 * np.sin(X[:, 0]) * np.cos(X[:, 1]) \
+ 300 * (X[:, 0] ** 2) \
- 200 * np.exp(-X[:, 1] ** 2)
# Add random feature interactions
y = y + 100 * (X[:, 0] * X[:, 1])
# Add random noise to make it "uglier"
y = y + np.random.normal(0, 150, size=y.shape)
model.fit(X, y)
# Create a mesh grid for the features
x_grid, y_grid = np.meshgrid(np.linspace(min(X[:, 0]), max(X[:, 0]), 100),
np.linspace(min(X[:, 1]), max(X[:, 1]), 100))
grid = np.vstack((x_grid.flatten(), y_grid.flatten())).T
predictions = model.predict(grid)
# Create 3D scatter plot for training data
scatter = go.Scatter3d(
x=X[:, 0], y=X[:, 1], z=y,
mode='markers', marker=dict(color='blue', size=5), name='Training Data'
)
# Create 3D surface plot for the regression surface
surface = go.Surface(
x=x_grid, y=y_grid, z=predictions.reshape(x_grid.shape), opacity=0.5, colorscale='Viridis', name='Regression Surface'
)
# Combine both traces into one figure
fig = go.Figure(data=[scatter, surface])
# Update layout for better visualization
fig.update_layout(
title=f'Training Data and Regression Surface for {model_name}',
scene=dict(
xaxis_title='Feature 1',
yaxis_title='Feature 2',
zaxis_title='Target'
)
)
# Show plot
fig.show()Linear Regression
OLS
class ols_regression():
# Initialize the class
def __init__(self):
pass
def fit(self, X, y):
'''Fit the regression to the X data via the OLS equation'''
# Add a leading colums of 1s to the X data to account for the bias term
X = np.hstack((np.ones((X.shape[0], 1)), X))
# Train the data on (X.T @ X)^(-1) @ X.T @ y
ols = np.linalg.inv(X.T.dot(X)).dot(X.T.dot(y))
self.coef = ols[1:]
self.bias = ols[0]
def predict(self, X):
'''Predict new data with the trained coefficients and bias'''
# Check if the X data is 1D and reshape if needed
if X.ndim == 1:
X = X.reshape(-1, 1)
# Make predictions by dotting the new data with the coefficients and adding the bias
self.predictions = X.dot(self.coef) + self.bias
return self.predictionsols = ols_regression()
plot_3d_regression(ols, model_name='OLS', data_type='linear')
plot_3d_regression(ols, model_name='OLS', data_type='ugly')
Gradient Descent Regression
class GDRegression():
def __init__(self, epochs, eta):
'''Initialize the Gradient Descent Regression Class'''
self.epochs = epochs
self.eta = eta
def fit(self, X, y, batch_size = "TBD"):
'''Train the Gradient Descent Regression Class'''
if batch_size == 'TBD':
batch_size = X.shape[0]
# Create random initialization for the bias and coefficients
bias = np.random.random()
coef = np.random.random(X.shape[1])
# Iterate through each epoch
for iter in range(self.epochs):
indices = np.random.choice(X.shape[0], size=min(batch_size, len(y)), replace=False)
X_batch = X[indices]
y_batch = y[indices]
# Make predictions for the X data being trained on
y_hat = X_batch.dot(coef) + bias
# Calculate the derrivative WRT bias and coef given the predicions
derr_b = 2/X_batch.shape[0] * sum((y_hat - y_batch))
derr_c = 2/X_batch.shape[0] * X_batch.T.dot(y_hat - y_batch)
# Update the bias and the coef based on the derrivative
bias = bias - derr_b * self.eta
coef = coef - derr_c * self.eta
# Finalize the bias and coef
self.bias = bias
self.coef = coef
def predict(self, X):
'''Predict new data given the learned bias and coef'''
predictions = X.dot(self.coef) + self.bias
return predictions
gd_reg = GDRegression(epochs=10000, eta=.01)
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='linear')
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='ugly')KNN Regression
class KNNRegressor():
def __init__(self, n_neighbors=5):
'''Initialize the regressor with a defined number of nearest neighbors'''
self.n_neighbors = n_neighbors
def fit(self, X, y):
'''Train the regressor by loading in all X and y data'''
self.X = X
self.y = y
def predict(self, X):
'''Make predictions based on the training data using euclidian distance'''
predictions = np.empty(0)
# For each test point...
for test_point in X:
# Calculate the distance between the test point and all training points
distances = np.linalg.norm(self.X - test_point, axis=1)
# Find the n_neighbors closest points
closest_points_indices = np.argsort(distances)[:self.n_neighbors]
# Use the mean of the closest points to formulate a predictions and append to the predictions array
prediction = mean(self.y[closest_points_indices])
predictions = np.append(predictions, prediction)
return predictionsknn_regressor = KNNRegressor()
plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='linear')
plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='ugly')class DecisionTreeRegressor():
def __init__(self, max_depth=None):
# Initializes the decision tree regressor with an optional maximum depth for the tree.
self.max_depth = max_depth
# Function for calculating the Mean Squared Error (MSE) of a split
def mse(self, y):
# MSE is calculated as the average of squared differences from the mean
return np.mean((y - np.mean(y)) ** 2)
# Function to find the best split at any given point (based on MSE)
def _best_split(self, X, y):
# Initialize variables to track the best split found
self.best_mse = float('inf') # Best MSE starts as infinity
self.best_feature = None
self.best_split_value = None
self.best_left_y = None
self.best_right_y = None
# Loop over each feature in the dataset
for feature_num in range(X.shape[1]):
# Get unique values in the feature column to test potential splits
feature_values = np.unique(X[:, feature_num])
# For each unique value, try splitting the data
for value in feature_values:
# Find indices where the feature values are less than or equal to the split value
left_index = X[:, feature_num] <= value
right_index = X[:, feature_num] > value
# Split the target values accordingly
left_targets = y[left_index]
right_targets = y[right_index]
# Proceed only if both splits result in non-empty groups
if len(left_targets) > 0 and len(right_targets) > 0:
# Compute MSE for both the left and right splits
left_mse = self.mse(left_targets)
right_mse = self.mse(right_targets)
# Calculate the weighted average MSE of the two splits
total_average_mse = left_mse * len(left_targets)/len(y) + right_mse * len(right_targets)/len(y)
# If this split provides a better (lower) MSE, update the best split found
if total_average_mse < self.best_mse:
self.best_mse = total_average_mse
self.best_feature = feature_num
self.best_split_value = value
self.left_y = left_targets
self.right_y = right_targets
# Return the best split information (feature index, value, and target splits)
return self.best_split_value, self.best_feature, self.left_y, self.right_y
# Function to recursively build the decision tree
def _build_tree(self, X, y, depth=0):
# Base case: If all targets are the same or max depth is reached, return the mean of the target values
if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
return np.mean(y)
# Find the best split for the data
best_split_value, best_feature, left_y, right_y = self._best_split(X, y)
# If no valid split was found, return the mean of the targets
if best_feature is None:
return np.mean(y)
# Split the data based on the best feature and split value
left_index = X[:, best_feature] <= best_split_value
right_index = X[:, best_feature] > best_split_value
# Recursively build the left and right subtrees
left_tree = self._build_tree(X[left_index], left_y, depth + 1)
right_tree = self._build_tree(X[right_index], right_y, depth + 1)
# Return the current node as a dictionary with the best split and its left and right subtrees
return {
'feature': best_feature,
'value': best_split_value,
'left': left_tree,
'right': right_tree
}
# Function to make a prediction for a single sample using the tree
def _single_prediction(self, tree, x):
# If the current tree node is a dictionary (not a leaf), recursively traverse the tree
if isinstance(tree, dict):
if x[tree['feature']] < tree['value']:
return self._single_prediction(tree['left'], x) # Go left
else:
return self._single_prediction(tree['right'], x) # Go right
# If the current node is a leaf (not a dictionary), return the prediction (mean of targets)
else:
return tree
# Function to predict target values for a set of samples
def predict(self, X):
# For each sample in X, make a prediction by recursively traversing the tree
predictions = np.array([self._single_prediction(self.tree, x) for x in X])
return predictions
# Function to fit the decision tree to the training data
def fit(self, X, y):
# Build the tree by calling the recursive function with the training data
self.tree = self._build_tree(X, y)dt_reg = DecisionTreeRegressor()
plot_3d_regression(dt_reg, "Decision Tree", data_type='linear')
plot_3d_regression(dt_reg, "Decision Tree", data_type='ugly')Random Forest Regression
class RandomForestRegressor():
def __init__(self, n_estimators, max_depth=None):
# Initializes the random forest regressor with the number of estimators (trees) and an optional maximum depth for each tree.
self.n_estimators = n_estimators
self.max_depth = max_depth
# Function for calculating the Mean Squared Error (MSE) of a split
def mse(self, y):
# MSE is calculated as the average of squared differences from the mean
return np.mean((y - np.mean(y)) ** 2)
# Function to find the best split at any given point (based on MSE)
def _best_split(self, X, y):
# Initialize variables to track the best split found
self.best_mse = float('inf') # Best MSE starts as infinity
self.best_feature = None
self.best_split_value = None
self.best_left_y = None
self.best_right_y = None
# Randomly sample 1/3 of the total features to consider for splitting
n_features_to_consider = max(1, X.shape[1] // 3)
random_feature_indices = np.random.choice(X.shape[1], size=n_features_to_consider, replace=False)
# Only consider the randomly selected features for splitting
feature_subset = X[:, random_feature_indices]
# Loop through the randomly selected features and find the best split
for i, feature_num in enumerate(random_feature_indices): # Map back to original feature indices
feature_values = np.unique(feature_subset[:, i])
for value in feature_values:
# Boolean indices based on the feature subset
left_index = feature_subset[:, i] <= value
right_index = feature_subset[:, i] > value
# Subset targets using these indices
left_targets = y[left_index]
right_targets = y[right_index]
# Ensure both sides have samples
if len(left_targets) > 0 and len(right_targets) > 0:
# Calculate the MSE for both the left and right subsets
left_mse = self.mse(left_targets)
right_mse = self.mse(right_targets)
total_average_mse = left_mse * len(left_targets) / len(y) + right_mse * len(right_targets) / len(y)
# If this split results in a better (lower) MSE, update the best split
if total_average_mse < self.best_mse:
self.best_mse = total_average_mse
self.best_feature = feature_num
self.best_split_value = value
self.left_y = left_targets
self.right_y = right_targets
# Return the best split information (feature index, value, and target splits)
return self.best_split_value, self.best_feature, self.left_y, self.right_y
# Function to recursively build the decision tree
def _build_tree(self, X, y, depth=0):
# Base case: If all targets are the same or max depth is reached, return the mean of the target values
if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
return np.mean(y)
# Find the best split for the data
best_split_value, best_feature, left_y, right_y = self._best_split(X, y)
# If no valid split was found, return the mean of the targets
if best_feature is None:
return np.mean(y)
# Split the data based on the best feature and split value
left_index = X[:, best_feature] <= best_split_value
right_index = X[:, best_feature] > best_split_value
# Recursively build the left and right subtrees
left_tree = self._build_tree(X[left_index], left_y, depth + 1)
right_tree = self._build_tree(X[right_index], right_y, depth + 1)
# Return the current node as a dictionary with the best split and its left and right subtrees
return {
'feature': best_feature,
'value': best_split_value,
'left': left_tree,
'right': right_tree
}
# Function to make a prediction for a single sample using the tree
def _single_prediction(self, tree, x):
# If the current tree node is a dictionary (not a leaf), recursively traverse the tree
if isinstance(tree, dict):
if x[tree['feature']] < tree['value']:
return self._single_prediction(tree['left'], x) # Go left
else:
return self._single_prediction(tree['right'], x) # Go right
# If the current node is a leaf (not a dictionary), return the prediction (mean of targets)
else:
return tree
# Function to predict target values for a set of samples
def predict(self, X):
predictions = []
# For each tree in the random forest, make predictions for all samples in X
for tree in self.trees:
model_predictions = np.array([self._single_prediction(tree, x) for x in X])
predictions.append(model_predictions)
# Combine all the predictions of all the trees and average across the trees
all_predictions = np.column_stack(predictions)
all_predictions = np.mean(all_predictions, axis=1)
return all_predictions
# Function to train the random forest model
def fit(self, X, y):
models = []
# Create n decision trees, each trained with bootstrapping (sampling with replacement)
for n in range(self.n_estimators):
random_row_indices = np.random.choice(len(X), len(X), replace=True)
subset_X = X[random_row_indices]
subset_y = y[random_row_indices]
tree = self._build_tree(subset_X, subset_y)
# Add each trained tree to the list of models
models.append(tree)
# Save all the trained trees
self.trees = modelsrf_regressor = RandomForestRegressor(10)
plot_3d_regression(rf_regressor, "Random Forest Regressor", "linear")
plot_3d_regression(rf_regressor, "Random Forest Regressor", "ugly")Gradient Boosting Regression
class GradientBoostingRegressor:
def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=3):
# Initialize model with given number of estimators, learning rate, and max depth for each tree
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.max_depth = max_depth
def fit(self, X, y):
# Initialize predictions as the mean of the target values
self.y_pred = np.full_like(y, np.mean(y), dtype=np.float32)
self.trees = []
for _ in range(self.n_estimators):
# Calculate residuals
residuals = y - self.y_pred
# Fit a tree to the residuals
tree = DecisionTreeRegressor(max_depth=self.max_depth)
tree.fit(X, residuals)
# Update predictions with the tree's output scaled by learning rate
self.y_pred += self.learning_rate * tree.predict(X)
self.trees.append(tree)
def predict(self, X):
# Start with initial predictions as the mean of target values
y_pred = np.full(X.shape[0], np.mean(self.y_pred), dtype=np.float32)
# Sum the residual predictions from all trees
for tree in self.trees:
y_pred += self.learning_rate * tree.predict(X)
return y_predboost_regressor = GradientBoostingRegressor(n_estimators=50)
plot_3d_regression(boost_regressor, "Gradient Boost Regressor", "linear")
plot_3d_regression(boost_regressor, "Gradient Boost Regressor", "ugly")